package com.boat.starter.pulsar.core;

import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.env.Environment;
import org.springframework.util.StringUtils;

import com.boat.starter.pulsar.common.PulsarMessage;
import com.boat.starter.pulsar.properties.PulsarProperties;
import com.boat.starter.pulsar.support.JSON2Schema;
import com.boat.starter.pulsar.support.SnowFlakeIdGenerator;

import javax.ws.rs.NotSupportedException;

/**
 * Description: 消费 工厂
 *
 * @author: zuomin (myleszelic@outlook.com)
 * @date: 2021/07/21-16:02
 */
public class PulsarConsumerFactory {

    private static Logger log = LoggerFactory.getLogger(PulsarConsumerFactory.class);

    private final PulsarClient client;
    private final Environment environment;
    private PulsarProperties pulsarProperties;

    private final SnowFlakeIdGenerator idGenerator = new SnowFlakeIdGenerator(20, 20);

    private final PulsarTopicFactory topicFactory;

    /** 消费者名称 格式 : DEFAULT_CONSUMER_{env}_{topoc} */
    private final String CONSUMER_NAME_PATTERN = "DEFAULT_CONSUMER_%s_%s";

    private PulsarConsumerFactory(){
        throw new NotSupportedException("Not Support");
    }

    public PulsarConsumerFactory(PulsarClient client, Environment environment, PulsarProperties pulsarProperties) {
        this.client = client;
        this.environment = environment;
        this.pulsarProperties = pulsarProperties;

        topicFactory = new PulsarTopicFactory(pulsarProperties);
    }

    /**
     * <p>create consumer builder for subscribe a consumer</p>
     *
     * @param topic topic
     * @return org.apache.pulsar.client.api.ConsumerBuilder<org.springframework.messaging.Message>
     */
    public ConsumerBuilder<PulsarMessage> createBuilder(String topic) {
        return client.newConsumer(JSON2Schema.of(PulsarMessage.class))
                .consumerName(getConsumerName(topic) + "_" + idGenerator.generate())
                .topic(topicFactory.obtainFullTopic(topic));
    }

    public String getConsumerName(String topic) {
        String appEnv = environment.getProperty("app.env");
        appEnv = StringUtils.hasLength(appEnv) ? appEnv : "dev";
        return String.format(CONSUMER_NAME_PATTERN, appEnv, topic);
    }

    public PulsarTopicFactory getTopicFactory() {
        return topicFactory;
    }
    
    public PulsarProperties getPulsarProperties() {
    	return pulsarProperties;
    }
}
